/*
 * @Description:
 * @Version: 1.0
 * @Autor: ycb
 * @Date: 2022-03-16 09:07:58
 * @LastEditors: ycb
 * @LastEditTime: 2022-03-30 19:11:07
 */
package rabiitmq

import (
	"log"
	"time"

	"github.com/streadway/amqp"
)

type RabbitMQPublishSubscribeClient struct {
	// 用户
	User string
	// 密码
	Pwd string
	// ip
	IP string
	// port
	Port string
	// channel
	channel *amqp.Channel
	// 关闭标志
	close bool
}

func (client *RabbitMQPublishSubscribeClient) connect() bool {
	// 默认关闭是是未关闭状态
	client.close = true
	url := "amqp://" + client.User + ":" + client.Pwd + "@" + client.IP + ":" + client.Port + "/"
	// 新建一个连接
	connect, err := amqp.Dial(url)
	if err != nil {
		log.Printf("%s: %s", "Failed to connect to RabbitMQ", err)
		return false
	}

	// 打开一个channel
	ch, err := connect.Channel()
	if err != nil {
		log.Printf("%s: %s", "Failed to open a channel", err)
		ch.Close()
		return false
	}

	client.channel = ch
	return true
}

func (client *RabbitMQPublishSubscribeClient) SendMsg(queue_name string, bytes []byte) bool {

	// 声明或者创建一个队列用来保存消息
	err := client.channel.ExchangeDeclare(
		queue_name, // name
		"fanout",   // type
		false,      // durable
		false,      // auto-deleted
		false,      // internal
		false,      // no-wait
		nil,        // arguments
	)

	if err != nil {
		log.Printf("%s: %s", "Failed to declare a queue", err)
		return false
	}

	err = client.channel.Publish(
		queue_name, // exchange
		"",         // routing key
		false,      // mandatory
		false,      // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        bytes,
		})

	if err != nil {
		log.Printf("%s: %s", "Failed to publish a message", err)
		return false
	}

	return true

}

func (client *RabbitMQPublishSubscribeClient) Close() {
	client.close = false
}

func (client *RabbitMQPublishSubscribeClient) RecvMsg(exchange string, queue_name string, stream_chan chan []byte) bool {
	for {
		rabbitmq_key := time.Now().Format("2006-01-02 15:04:05")
		tmp_queue_name := queue_name + " connect time:" + rabbitmq_key
		connect_err := client.connect()
		if !connect_err {
			continue
		}

		err := client.channel.ExchangeDeclare(
			exchange, // name
			"fanout", // type
			false,    // durable
			false,    // auto-deleted
			false,    // internal
			false,    // no-wait
			nil,      // arguments
		)

		if err != nil {
			log.Printf("%s: %s", "Failed to declare an exchange", err)
			continue
		}

		q, err := client.channel.QueueDeclare(
			tmp_queue_name, // name
			false,          // durable
			false,          // delete when unused
			true,           // exclusive
			false,          // no-wait
			nil,            // arguments
		)

		if err != nil {
			log.Printf("%s: %s name: %s", "Failed to declare a queue", err, tmp_queue_name)
			continue
		}

		err = client.channel.QueueBind(
			tmp_queue_name, // queue name
			"",             // routing key
			exchange,       // exchange
			false,
			nil,
		)

		if err != nil {
			log.Printf("%s: %s name: %s", "Failed to bind a queue", err, tmp_queue_name)
			continue
		}

		msgs, err := client.channel.Consume(
			q.Name, // queue
			"",     // consumer
			true,   // auto-ack
			false,  // exclusive
			false,  // no-local
			false,  // no-wait
			nil,    // args
		)

		if err != nil {
			log.Printf("%s: %s", "Failed to register a consumer", err)
			continue
		}

		for d := range msgs {
			// 把body发送出去
			stream_chan <- d.Body
		}
	}
}
